package com.wugui.datax.rpc.remoting.net.impl.netty.server;

import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wugui.datax.rpc.remoting.net.params.Beat;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcRequest;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcResponse;
import com.wugui.datax.rpc.remoting.provider.XxlRpcProviderFactory;
import com.wugui.datax.rpc.util.ThrowableUtil;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * netty server handler
 *
 * @author xuxueli 2015-10-29 20:07:37
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<XxlRpcRequest> {
	private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

	private XxlRpcProviderFactory xxlRpcProviderFactory;
	private ThreadPoolExecutor serverHandlerPool;

	public NettyServerHandler(final XxlRpcProviderFactory xxlRpcProviderFactory,
			final ThreadPoolExecutor serverHandlerPool) {
		this.xxlRpcProviderFactory = xxlRpcProviderFactory;
		this.serverHandlerPool = serverHandlerPool;
	}

	@Override
	public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception {

		// filter beat
		if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())) {
			logger.debug(">>>>>>>>>>> xxl-rpc provider netty server read beat-ping.");
			return;
		}

		// do invoke
		try {
			serverHandlerPool.execute(new Runnable() {
				@Override
				public void run() {
					// invoke + response
					XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

					ctx.writeAndFlush(xxlRpcResponse);
				}
			});
		} catch (Exception e) {
			// catch error
			XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
			xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
			xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));

			ctx.writeAndFlush(xxlRpcResponse);
		}

	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		logger.error(">>>>>>>>>>> xxl-rpc provider netty server caught exception", cause);
		ctx.close();
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (evt instanceof IdleStateEvent) {
			ctx.channel().close(); // beat 3N, close if idle
			logger.debug(">>>>>>>>>>> xxl-rpc provider netty server close an idle channel.");
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}

}
